{
    "cells": [
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "# Training and Evaluating Time Series Forcasting Model"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "## Introduction \n",
                "\n",
                "In this notebook, we will develop a program to forcast time series data which has seasonal cycles. We will use [NYC Property Sales data](https://www1.nyc.gov/site/finance/about/open-portal.page) range from 2003 to 2015 published by NYC Department of Finance on the [NYC Open Data Portal](https://opendata.cityofnewyork.us/). \n",
                "\n",
                "The dataset is a record of every building sold in New York City property market during 13-year period. Please refer to [Glossary of Terms for Property Sales Files](https://www1.nyc.gov/assets/finance/downloads/pdf/07pdf/glossary_rsf071607.pdf) for definition of columns in the spreadsheet. The dataset looks like below:\n",
                "\n",
                "|borouge|neighborhood|building_class_category|tax_class|block|lot|eastment|building_class_at_present|address|apartment_number|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|\n",
                "|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|----|\n",
                "|Manhattan|ALPHABET CITY|07  RENTALS - WALKUP APARTMENTS|0.0|384.0|17.0||C4|225 EAST 2ND   STREET||10009.0|10.0|0.0|10.0|2145.0|6670.0|1900.0|2.0|C4|275000.0|2007-06-19|\n",
                "|Manhattan|ALPHABET CITY|07  RENTALS - WALKUP APARTMENTS|2.0|405.0|12.0||C7|508 EAST 12TH   STREET||10009.0|28.0|2.0|30.0|3872.0|15428.0|1930.0|2.0|C7|7794005.0|2007-05-21|\n",
                "\n",
                "We will build up a model to forcast monthly volume of property trade based on history data. In order to forecast, we will use [Facebook Prophet](https://facebook.github.io/prophet/), which provides fast and automated forecast procedure and handles seasonality well."
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "## Install Prophet\n",
                "\n",
                "Let's first install [Facebook Prophet](https://facebook.github.io/prophet/). It uses a decomposable time series model which consist of three main components: trend, seasonality and holidays. \n",
                "For the trend part, Prophet assumes piece-wise constant rate of growth with automatic chagne point selection.\n",
                "For seasonality part, Prophet models weekly and yearly seasonality using Fourier Series. Since we are using monthly data, so we won't have weekly seasonality and won't considering holidays."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "!pip install prophet"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "## Step 1: Load the Data"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### Download dataset and Upload to Lakehouse\n",
                "\n",
                "There are 15 csv files containig property sales records from 5 boroughs in New York since 2003 to 2015. For your convenience, these files are compressed in `nyc_property_sales.tar` and is available in a public blob storage."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "URL = \"https://synapseaisolutionsa.blob.core.windows.net/public/NYC_Property_Sales_Dataset/\"\n",
                "TAR_FILE_NAME = \"nyc_property_sales.tar\"\n",
                "DATA_FOLER = \"Files/NYC_Property_Sales_Dataset\"\n",
                "TAR_FILE_PATH = f\"/lakehouse/default/{DATA_FOLER}/tar/\"\n",
                "CSV_FILE_PATH = f\"/lakehouse/default/{DATA_FOLER}/csv/\""
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "import os\n",
                "\n",
                "if not os.path.exists(\"/lakehouse/default\"):\n",
                "    # ask user to add a lakehouse if no default lakehouse added to the notebook.\n",
                "    # a new notebook will not link to any lakehouse by default.\n",
                "    raise FileNotFoundError(\n",
                "        \"Default lakehouse not found, please add a lakehouse for the notebook.\"\n",
                "    )\n",
                "else:\n",
                "    # check if the needed files are already in the lakehouse, try to download and unzip if not.\n",
                "    if not os.path.exists(f\"{TAR_FILE_PATH}{TAR_FILE_NAME}\"):\n",
                "        os.makedirs(TAR_FILE_PATH, exist_ok=True)\n",
                "        os.system(f\"wget {URL}{TAR_FILE_NAME} -O {TAR_FILE_PATH}{TAR_FILE_NAME}\")\n",
                "\n",
                "    os.makedirs(CSV_FILE_PATH, exist_ok=True)\n",
                "    os.system(f\"tar -zxvf {TAR_FILE_PATH}{TAR_FILE_NAME} -C {CSV_FILE_PATH}\")"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### Create Dataframe from Lakehouse\n",
                "\n",
                "The `display` function print the dataframe and automatically gives chart views."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "df = (\n",
                "    spark.read.format(\"csv\")\n",
                "    .option(\"header\", \"true\")\n",
                "    .load(\"Files/NYC_Property_Sales_Dataset/csv\")\n",
                ")\n",
                "display(df)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "## Step 2: Data Preprocessing"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### Type Conversion and Filtering\n",
                "Lets do some necessary type conversion and filtering.\n",
                "- Need convert sale prices to integers.\n",
                "- Need exclude irregular sales data. For example, a $0 sale indicate ownership transfer without cash consideration.\n",
                "- Exclude building types other than A class.\n",
                "\n",
                "The reason to choose only market of A class building for analysis is that seasonal effect is innegligible coeffient for A class building. The model we will use outperform many others in including seasonality, which is very common needs in time series analysis."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "# import libs\n",
                "import pyspark.sql.functions as F\n",
                "from pyspark.sql.types import *"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "df = df.withColumn(\n",
                "    \"sale_price\", F.regexp_replace(\"sale_price\", \"[$,]\", \"\").cast(IntegerType())\n",
                ")\n",
                "df = df.select(\"*\").where(\n",
                "    'sale_price > 0 and total_units > 0 and gross_square_feet > 0 and building_class_at_time_of_sale like \"A%\"'\n",
                ")"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "monthly_sale_df = df.select(\n",
                "    \"sale_price\",\n",
                "    \"total_units\",\n",
                "    \"gross_square_feet\",\n",
                "    F.date_format(\"sale_date\", \"yyyy-MM\").alias(\"month\"),\n",
                ")"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "display(df)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "summary_df = (\n",
                "    monthly_sale_df.groupBy(\"month\")\n",
                "    .agg(\n",
                "        F.sum(\"sale_price\").alias(\"total_sales\"),\n",
                "        F.sum(\"total_units\").alias(\"units\"),\n",
                "        F.sum(\"gross_square_feet\").alias(\"square_feet\"),\n",
                "    )\n",
                "    .orderBy(\"month\")\n",
                ")"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "display(summary_df)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### Visualization\n",
                "Now, let's take a look at the trend of property trade trend at NYC. The yearly seasonality is quite clear on the chosen building class. The peek buying seasons are usually spring and fall."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "df_pandas = summary_df.toPandas()"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "import matplotlib.pyplot as plt\n",
                "import seaborn as sns\n",
                "import numpy as np\n",
                "\n",
                "f, (ax1, ax2) = plt.subplots(2, 1, figsize=(35, 10))\n",
                "plt.sca(ax1)\n",
                "plt.xticks(np.arange(0, 15 * 12, step=12))\n",
                "plt.ticklabel_format(style=\"plain\", axis=\"y\")\n",
                "sns.lineplot(x=\"month\", y=\"total_sales\", data=df_pandas)\n",
                "plt.ylabel(\"Total Sales\")\n",
                "plt.xlabel(\"Time\")\n",
                "plt.title(\"Total Property Sales by Month\")\n",
                "\n",
                "plt.sca(ax2)\n",
                "plt.xticks(np.arange(0, 15 * 12, step=12))\n",
                "plt.ticklabel_format(style=\"plain\", axis=\"y\")\n",
                "sns.lineplot(x=\"month\", y=\"square_feet\", data=df_pandas)\n",
                "plt.ylabel(\"Total Square Feet\")\n",
                "plt.xlabel(\"Time\")\n",
                "plt.title(\"Total Property Square Feet Sold by Month\")\n",
                "plt.show()"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "## Step 3: Model Training and Evaluation"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### Model Fitting\n",
                "\n",
                "To do model fitting, we just need to rename the time axis to 'ds' and value axis to 'y'"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "import pandas as pd\n",
                "\n",
                "df_pandas[\"ds\"] = pd.to_datetime(df_pandas[\"month\"])\n",
                "df_pandas[\"y\"] = df_pandas[\"total_sales\"]"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "Now let's fit the model. We will choose to use 'multiplicative' seasonality, it means seasonality is no longer a constant additive factor like defualt assumed by Prophet. As you can see a in previous cell, we printed out the total property sale data per month, and the vibration amplitude is not consistant. It means using simple additive seasonlity won't fit the data well.\n",
                "In addition, we will use Markov Chain Monte Carlo(MCMC) that gives mean of posteriori distribution. By default, Prophet uses Stan's L-BFGS to fit the model, which find a maximum a posteriori probability(MAP) estimate.\n"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "from prophet import Prophet\n",
                "from prophet.plot import add_changepoints_to_plot\n",
                "\n",
                "m = Prophet(\n",
                "    seasonality_mode=\"multiplicative\", weekly_seasonality=False, mcmc_samples=1000\n",
                ")\n",
                "m.fit(df_pandas)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "Let's use bulit-in functions in Prophet to show the model fitting results. The black dots are data points used to train the model. The blue line is the prediction and the light blue area shows uncertainty intervals."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "future = m.make_future_dataframe(periods=12, freq=\"M\")\n",
                "forecast = m.predict(future)\n",
                "fig = m.plot(forecast)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "Prophet assumes piece-wise constant growth, thus you can plot the change points of the trained model"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "fig = m.plot(forecast)\n",
                "a = add_changepoints_to_plot(fig.gca(), m, forecast)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "Visualize trend and yearly seasonality. The light blue area reflects uncertainty."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "fig2 = m.plot_components(forecast)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### Cross Validation"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "We can use Prophet's built in cross validation functionality to measure the forecast error on historical data. The below parameters means starting with 11 years of training data, then making predictions every 30 days within 1 year horizon."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "from prophet.diagnostics import cross_validation\n",
                "from prophet.diagnostics import performance_metrics\n",
                "\n",
                "df_cv = cross_validation(m, initial=\"11 Y\", period=\"30 days\", horizon=\"365 days\")\n",
                "df_p = performance_metrics(df_cv, monthly=True)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "display(df_p)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "## Step 4: Log and Load Model with MLFlow\n",
                "We can now store the trained model for later use."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "# setup mlflow\n",
                "import mlflow\n",
                "import trident.mlflow\n",
                "from trident.mlflow import get_sds_url\n",
                "\n",
                "EXPERIMENT_NAME = \"aisample-timeseries\"\n",
                "\n",
                "mlflow.set_tracking_uri(get_sds_url())\n",
                "mlflow.set_registry_uri(get_sds_url())\n",
                "mlflow.set_experiment(EXPERIMENT_NAME)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "# log the model and parameters\n",
                "model_name = f\"{EXPERIMENT_NAME}-prophet\"\n",
                "with mlflow.start_run() as run:\n",
                "    mlflow.prophet.log_model(m, model_name, registered_model_name=model_name)\n",
                "    mlflow.log_params({\"seasonality_mode\": \"multiplicative\", \"mcmc_samples\": 1000})\n",
                "    model_uri = f\"runs:/{run.info.run_id}/{model_name}\"\n",
                "    print(\"Model saved in run %s\" % run.info.run_id)\n",
                "    print(f\"Model URI: {model_uri}\")"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": null,
            "metadata": {},
            "outputs": [],
            "source": [
                "# load the model back\n",
                "loaded_model = mlflow.prophet.load_model(model_uri)"
            ]
        }
    ],
    "metadata": {
        "kernelspec": {
            "display_name": "Python 3.8.13",
            "language": "python",
            "name": "python3"
        },
        "language_info": {
            "name": "python",
            "version": "3.8.13"
        },
        "orig_nbformat": 4,
        "vscode": {
            "interpreter": {
                "hash": "0c3c2e25a81c246d04d78d3cc407085024aed97ce8aa6c9f7fe4d74e99285c0c"
            }
        }
    },
    "nbformat": 4,
    "nbformat_minor": 0
}
